![[Amazon SageMaker] 画像合成によるデータセット作成時における背景の扱いについて](https://devio2023-media.developers.io/wp-content/uploads/2019/05/amazon-sagemaker.png)
[Amazon SageMaker] 画像合成によるデータセット作成時における背景の扱いについて
1 はじめに
2 単一商品のデータセット
3 複数商品のデータセット
そこで、複数の商品を背景に合成してデータセットを作成するようにしました。 ランダムに商品画像を取り出し、完全に重ならないように配置して1つのデータとしています。
4 コード
""" 下記を参考にさせて頂きました。 https://github.com/aws-samples/smart-cooler/blob/master/_ml_model_package/synthetic-dataset.ipynb """ import json import glob import random import os import shutil import math import numpy as np import cv2 from PIL import Image MAX = 3000 # 生成する画像数 CLASS_NAME=["jagabee","chipstar","butamen","kyo_udon","koara","curry"] COLORS = [(0,0,175),(175,0,0),(0,175,0),(175,175,0),(0,175,175),(175,175,175)] SIZE=[150, 150, 150 ,185, 150, 185] BACK_PATH = "./backgrounds" PRODUCT_PATH = "./products" OUTPUT_PATH = "./output" S3Bucket = "s3://ground_truth_dataset" manifestFile = "output.manifest" def euler_to_mat(yaw, pitch, roll): c, s = math.cos(yaw), math.sin(yaw) M = np.matrix([[ c, 0., s], [ 0., 1., 0.], [ -s, 0., c]]) c, s = math.cos(pitch), math.sin(pitch) M = np.matrix([[ 1., 0., 0.], [ 0., c, -s], [ 0., s, c]]) * M c, s = math.cos(roll), math.sin(roll) M = np.matrix([[ c, -s, 0.], [ s, c, 0.], [ 0., 0., 1.]]) * M return M def make_affine_transform(from_shape, to_shape, min_scale, max_scale, scale_variation=1.0, rotation_variation=1.0, translation_variation=1.0): from_size = np.array([[from_shape[1], from_shape[0]]]).T to_size = np.array([[to_shape[1], to_shape[0]]]).T scale = random.uniform((min_scale + max_scale) * 0.5 - (max_scale - min_scale) * 0.5 * scale_variation, (min_scale + max_scale) * 0.5 + (max_scale - min_scale) * 0.5 * scale_variation) roll = random.uniform(-1.0, 1.0) * rotation_variation pitch = random.uniform(-0.15, 0.15) * rotation_variation yaw = random.uniform(-0.15, 0.15) * rotation_variation M = euler_to_mat(yaw, pitch, roll)[:2, :2] h = from_shape[0] w = from_shape[1] corners = np.matrix([[-w, +w, -w, +w], [-h, -h, +h, +h]]) * 0.5 skewed_size = np.array(np.max(M * corners, axis=1) - np.min(M * corners, axis=1)) scale *= np.min(to_size / skewed_size) trans = (np.random.random((2,1)) - 0.5) * translation_variation trans = ((2.0 * trans) ** 5.0) / 2.0 trans = (to_size - skewed_size * scale) * trans center_to = to_size / 2. center_from = from_size / 2. M = euler_to_mat(yaw, pitch, roll)[:2, :2] M *= scale M = np.hstack([M, trans + center_to - M * center_from]) return M # アフィン変換 def transform(backImage, productImage, productSize): M = make_affine_transform( from_shape=productImage.shape, to_shape=backImage.shape, min_scale=1.0, max_scale=1.0, rotation_variation=3.5, # scale_variation=2.0, scale_variation=1.0, translation_variation=0.98) object_topleft = tuple(M.dot(np.array((0, 0) + (1, ))).tolist()[0]) object_topright = tuple(M.dot(np.array((productSize, 0) + (1,))).tolist()[0]) object_bottomleft = tuple(M.dot(np.array((0,productSize) + (1,))).tolist()[0]) object_bottomright = tuple(M.dot(np.array((productSize, productSize) + (1,))).tolist()[0]) object_tups = (object_topleft, object_topright, object_bottomleft, object_bottomright) object_xmin = (min(object_tups, key=lambda item:item[0])[0]) object_xmax = (max(object_tups, key=lambda item:item[0])[0]) object_ymin = (min(object_tups, key=lambda item:item[1])[1]) object_ymax = (max(object_tups, key=lambda item:item[1])[1]) rect = ((int(object_xmin),int(object_ymin)),(int(object_xmax),int(object_ymax))) productImage = cv2.warpAffine(productImage, M, (backImage.shape[1], backImage.shape[0])) return productImage, rect # 背景と商品の合成 def margeImage(backImg, productImg): # PIL形式で重ねる back_pil = Image.fromarray(backImg) product_pil = Image.fromarray(productImg) back_pil.paste(product_pil, (0, 0), product_pil) return np.array(back_pil) # エフェクト(Gauss) def addGauss(img, level): return cv2.blur(img, (level * 2 + 1, level * 2 + 1)) # エフェクト(Noise) def addNoiseSingleChannel(single): diff = 255 - single.max() noise = np.random.normal(0, random.randint(1, 100), single.shape) noise = (noise - noise.min())/(noise.max()-noise.min()) noise= diff*noise noise= noise.astype(np.uint8) dst = single + noise return dst # エフェクト(Noise) def addNoise(img): img = img.astype('float64') img[:,:,0] = addNoiseSingleChannel(img[:,:,0]) img[:,:,1] = addNoiseSingleChannel(img[:,:,1]) img[:,:,2] = addNoiseSingleChannel(img[:,:,2]) return img.astype('uint8') # バウンディングボックス描画 def box(frame, rect, class_id): ((x1,y1),(x2,y2)) = rect label = "{}".format(CLASS_NAME[class_id]) img = cv2.rectangle(frame,(x1, y1), (x2, y2), COLORS[class_id],2) img = cv2.rectangle(img,(x1, y1), (x1 + 150,y1-20), COLORS[class_id], -1) cv2.putText(img,label,(x1+2, y1-2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA) return img # Manifest生成クラス class Manifest: def __init__(self, class_name): self.__lines = '' self.__class_map={} for i in range(len(class_name)): self.__class_map[str(i)] = class_name[i] def appned(self, fileName, data, height, width): date = "0000-00-00T00:00:00.000000" line = { "source-ref": "{}/{}".format(S3Bucket, fileName), "boxlabel": { "image_size": [ { "width": width, "height": height, "depth": 3 } ], "annotations": [] }, "boxlabel-metadata": { "job-name": "xxxxxxx", "class-map": self.__class_map, "human-annotated": "yes", "objects": { "confidence": 1 }, "creation-date": date, "type": "groundtruth/object-detection" } } for i in range(data.max()): (_, rect, class_id) = data.get(i) ((x1,y1),(x2,y2)) = rect line["boxlabel"]["annotations"].append({ "class_id": class_id, "width": x2 - x1, "top": y1, "height": y2 - y1, "left": x1 }) self.__lines += json.dumps(line) + '\n' def get(self): return self.__lines # 背景画像生成クラス class Backgrounds: def __init__(self, backPath): self.__backPath = backPath def get(self): imagePath = random.choice(glob.glob(self.__backPath + '/*.png')) return cv2.imread(imagePath, cv2.IMREAD_UNCHANGED) # 商品画像生成クラス class Products: def __init__(self, productPath, class_name, size): self.__productPath = productPath self.__class_name = class_name self.__size = size def get(self, class_id): # 商品画像 class_name = self.__class_name[class_id] image_path = random.choice(glob.glob(self.__productPath + '/' + class_name + '/*.png')) product_image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) # 商品画像のサイズ size = self.__size[class_id] return (self.__resize(product_image, size), size, class_id) # 商品画像のサイズ調整 def __resize(self, img, size): org_h, org_w = img.shape[:2] imageArray = np.zeros((org_h, org_w, 4), np.uint8) img = cv2.resize(img, (size, size)) imageArray[0:size, 0:size] = img return imageArray # 1画像分のデータを保持するクラス class Data: def __init__(self, rate): self.__rects = [] self.__images = [] self.__class_ids = [] self.__rate = rate def get_class_ids(self): return self.__class_ids def max(self): return len(self.__rects) def get(self, i): return (self.__images[i], self.__rects[i], self.__class_ids[i]) # 追加(重複率が指定値以上の場合は失敗する) def append(self, productImage, rect, class_id): conflict = False for i in range(len(self.__rects)): iou = self.__multiplicity(self.__rects[i], rect) if(iou > self.__rate): conflict = True break if(conflict == False): self.__rects.append(rect) self.__images.append(productImage) self.__class_ids.append(class_id) return True return False # 重複率 def __multiplicity(self, a, b): (ax_mn, ay_mn) = a[0] (ax_mx, ay_mx) = a[1] (bx_mn, by_mn) = b[0] (bx_mx, by_mx) = b[1] a_area = (ax_mx - ax_mn + 1) * (ay_mx - ay_mn + 1) b_area = (bx_mx - bx_mn + 1) * (by_mx - by_mn + 1) abx_mn = max(ax_mn, bx_mn) aby_mn = max(ay_mn, by_mn) abx_mx = min(ax_mx, bx_mx) aby_mx = min(ay_mx, by_mx) w = max(0, abx_mx - abx_mn + 1) h = max(0, aby_mx - aby_mn + 1) intersect = w*h return intersect / (a_area + b_area - intersect) # 各クラスのデータ数が同一になるようにカウントする class Counter(): def __init__(self, max): self.__counter = np.zeros(max) def get(self): n = np.argmin(self.__counter) self.__counter[n] += 1 return int(n) def print(self): print(self.__counter) def main(): # 出力先の初期化 if os.path.exists(OUTPUT_PATH): shutil.rmtree(OUTPUT_PATH) os.mkdir(OUTPUT_PATH) backgrounds = Backgrounds(BACK_PATH) products = Products(PRODUCT_PATH, CLASS_NAME, SIZE) manifest = Manifest(CLASS_NAME) counter = Counter(len(CLASS_NAME)) no = 0 while(True): # 背景画像の取得 backImage = backgrounds.get() # 商品データ data = Data(0.15) for _ in range(20): class_id = counter.get() # class_id = random.randint(0, len(CLASS_NAME)-1) # 商品画像の取得 product_image, product_size, class_id = products.get(class_id) # アフィン変換 product_image, rect = transform(backImage, product_image, product_size) # 商品の追加(重複した場合は、失敗する) data.append(product_image, rect, class_id) frame = backImage for index in range(data.max()): (product_image, _, _) = data.get(index) # 合成 frame = margeImage(frame, product_image) # アルファチャンネル削除 frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR) # エフェクト(gauss) frame = addGauss(frame, random.randint(0, 2)) # エフェクト(Noise) frame = addNoise(frame) # 画像名 fileName = "{:04d}.png".format(no) no+=1 # 画像保存 cv2.imwrite("{}/{}".format(OUTPUT_PATH, fileName), frame) # manifest追加 manifest.appned(fileName, data, frame.shape[0], frame.shape[1]) for i in range(data.max()): (_, rect, class_id) = data.get(i) # バウンディングボックス描画(確認用) frame = box(frame, rect, class_id) counter.print() if(MAX < no): break # 表示(確認用) cv2.imshow("frame", frame) cv2.waitKey(1) # manifest 保存 with open('{}/{}'.format(OUTPUT_PATH, manifestFile), 'w') as f: f.write(manifest.get()) main()
5 RecordIO形式
上記で作成したAmazon SageMaker Ground Truth形式のデータをim2rec.pyを利用してRecordIO形式に変換しているコードです。
curl -O https://raw.githubusercontent.com/apache/incubator-mxnet/master/tools/im2rec.py
import json import os import subprocess # 定義 inputPath = '/tmp/input' outputPath = '/tmp/output' manifest = 'output.manifest' CLASS_NAME=["jagabee","chipstar","butamen","kyo_udon","koara","curry"] # 分割の比率 ratio = 0.8 # 8:2に分割する # 1件のデータを表現するクラス class Data(): def __init__(self, src): # プロジェクト名の取得 for key in src.keys(): index = key.rfind("-metadata") if(index!=-1): projectName = key[0:index] # メタデータの取得 metadata = src[projectName + '-metadata'] class_map = metadata["class-map"] # 画像名の取得 self.__imgFileName = os.path.basename(src["source-ref"]) # 画像サイズの取得 project = src[projectName] image_size = project["image_size"] self.__img_width = image_size[0]["width"] self.__img_height = image_size[0]["height"] self.__annotations = [] # アノテーションの取得 for annotation in project["annotations"]: class_id = annotation["class_id"] top = annotation["top"] left = annotation["left"] width = annotation["width"] height = annotation["height"] self.__annotations.append({ "label": class_map[str(class_id)], "width": width, "top": top, "height": height, "left": left }) @property def annotations(self): return self.__annotations # 指定されたラベルを含むかどうか def exsists(self, label): for annotation in self.__annotations: if(annotation["label"] == label): return True return False # .lstを生成して追加する def appendLst(self, lst, cls_list): index = len(lst.split('\n')) headerSize = 4 # hederSize,dataSize,imageWidth,imageHeight dataSize = 5 str = "{}\t{}\t{}\t{}\t{}".format(index, headerSize, dataSize, self.__img_width, self.__img_height) for annotation in self.__annotations: cls_id = cls_list.index(annotation["label"]) left = annotation["left"] right = left + annotation["width"] top = annotation["top"] bottom = top + annotation["height"] left = round(left / self.__img_width, 3) right = round(right / self.__img_width, 3) top = round(top / self.__img_height, 3) bottom = round(bottom / self.__img_height, 3) str += "\t{}\t{}\t{}\t{}\t{}".format(cls_id, left, top, right, bottom) fileName = self.__imgFileName str += "\t{}".format(fileName) lst += str + "\n" return lst # 全てのJSONデータを読み込む def getDataList(inputPath, manifest): dataList = [] with open("{}/{}".format(inputPath, manifest), 'r') as f: srcList = f.read().split('\n') for src in srcList: if(src != ''): json_src = json.loads(src) dataList.append(Data(json.loads(src))) return dataList def main(): # 出力先フォルダ生成 os.makedirs(outputPath, exist_ok=True) # 全てのJSONデータを読み込む dataList = getDataList(inputPath, manifest) total_len = len(dataList) train_len = int(total_len * ratio) print("全データ: {}件 train: {} validation: {}".format(total_len, train_len, total_len-train_len)) # .lst形式 train = '' validation = '' for i in range(train_len): data = dataList.pop() train = data.appendLst(train, CLASS_NAME) for data in dataList: validation = data.appendLst(validation, CLASS_NAME) # .lstファイルの生成 trainLst = "{}/train.lst".format(outputPath) validationLst = "{}/validation.lst".format(outputPath) with open(trainLst, mode='w') as f: f.write(train) with open(validationLst, mode='w') as f: f.write(validation) # im2rec.pyによるRecordIOファイル生成 # python im2rec.py --pack-label <your_lst_file_name> <your_image_folder> im2rec = "{}/im2rec.py".format(os.getcwd()) cmd = ["python3", im2rec, "--pack-label", "validation.lst", inputPath] result = subprocess.run(cmd, cwd=outputPath) print(result) cmd = ["python3", im2rec, "--pack-label", "train.lst", inputPath] result = subprocess.run(cmd, cwd=outputPath) print(result) main()
6 学習
epochs = 10 num_classes = 6 num_training_samples = 2400 od_model.set_hyperparameters(base_network='resnet-50', use_pretrained_model=1, num_classes=num_classes, mini_batch_size=32, epochs=epochs, learning_rate=0.001, lr_scheduler_step='3,6', lr_scheduler_factor=0.1, optimizer='sgd', momentum=0.9, weight_decay=0.0005, overlap_threshold=0.5, nms_threshold=0.45, image_shape=512, label_width=350, num_training_samples=num_training_samples)
7 最後に
今回は、合成によるデータセット作成での背景の扱いについての紹介でした。 合成も、実際にモデルを使用する場面で、どのような状況になるかを考えて行うことが重要だと思います。